<?php
declare (strict_types = 1);

namespace app\home\command;

use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Worker;

class Workerman extends Command
{
    // php think workerman server
    protected function configure()
    {
        // 指令配置
        $this->setName('workerman')
            ->addArgument('name', Argument::OPTIONAL, "your task name")
            ->setDescription('the workerman command');
    }

    protected function execute(Input $input, Output $output)
    {
        $name = $input->getArgument('name');
        $name = $name ? trim($name) : 'client';
        if ($name == 'client') {
            $this->client();
        } else {
            $this->server();
        }

        // 指令输出
        $output->writeln('workerman');
    }

    private function http()
    {
        // 创建一个Worker监听2345端口，使用http协议通讯
        $http_worker = new Worker("http://0.0.0.0:2345");

        // 启动4个进程对外提供服务
        $http_worker->count = 4;

        // 接收到浏览器发送的数据时回复hello world给浏览器
        $http_worker->onMessage = function($connection, $data)
        {
            // 向浏览器发送hello world
            $connection->send('hello world');
        };

        // 运行worker
        Worker::runAll();
    }

    private function tcp()
    {
        // 创建一个Worker监听2347端口，不使用任何应用层协议
        $tcp_worker = new Worker("tcp://0.0.0.0:2347");

        // 启动4个进程对外提供服务
        $tcp_worker->count = 4;

        // 当客户端发来数据时
        $tcp_worker->onMessage = function($connection, $data)
        {
            // 向客户端发送hello $data
            $connection->send('hello ' . $data);
        };

        // 运行worker
        Worker::runAll();
    }

    private function wsServer()
    {
        $ws_worker = new Worker("websocket://0.0.0.0:2345");
        // 启动4个进程对外提供服务
        $ws_worker->count = 1;

        // 当收到客户端发来的数据后返回hello $data给客户端
        $ws_worker->onMessage = function($connection, $data)
        {
            $data = json_decode($data, true);
            static $conn = [];
            if (!isset($conn[$data['uid']])) {
                $conn[$data['uid']] = $connection;
            }
            if (isset($conn[$data['user_id']])) {
                $conn[$data['user_id']]->send($data['message']);
            }
        };

        // 运行worker
        Worker::runAll();
    }

    /**
     * @return void
     */
    private function wsClient()
    {
        $worker = new Worker();

        $worker->onWorkerStart = function($worker){

            $con = new AsyncTcpConnection('ws://127.0.0.1:8282');

            $con->onConnect = function($con) {
                $data = [
                    'type'    => 'one',
                    'user_id' => 1,
                    'message' => 'hello'
                ];
                $con->send(json_encode($data));
            };

            $con->onMessage = function($con, $data) {
                echo $data;
            };

            $con->connect();
        };

        Worker::runAll();
    }

    /**
     * async task server
     * @return void
     */
    private function server()
    {
        // task worker，使用Text协议
        $task_worker = new Worker('Text://0.0.0.0:12345');
        // task进程数可以根据需要多开一些
        $task_worker->count = 100;
        $task_worker->name = 'TaskWorker';
        $task_worker->onMessage = function($connection, $task_data)
        {
            // 假设发来的是json数据
            $task_data = json_decode($task_data, true);
            // 根据task_data处理相应的任务逻辑.... 得到结果
            switch ($task_data['function']) {
                case 'send_mail':

                    break;


            }
            $task_result = ['code' => 0, 'message' => '', 'data' => []];
            // 发送结果
            $connection->send(json_encode($task_result));
        };
        Worker::runAll();
    }

    /**
     * async task client
     * @return void
     */
    private function client()
    {
        // websocket服务
        $worker = new Worker('websocket://0.0.0.0:8080');

        $worker->onMessage = function($ws_connection, $message) {
            // 与远程task服务建立异步连接，ip为远程task服务的ip，如果是本机就是127.0.0.1，如果是集群就是lvs的ip
            $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
            // 任务及参数数据
            $task_data = array(
                'function' => 'send_mail',
                'args'     => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
            );
            // 发送数据
            $task_connection->send(json_encode($task_data));
            // 异步获得结果
            $task_connection->onMessage = function($task_connection, $task_result)use($ws_connection)
            {
                // 结果
                var_dump($task_result);
                // 获得结果后记得关闭异步连接
                $task_connection->close();
                // 通知对应的websocket客户端任务完成
                $ws_connection->send('task complete');
            };
            // 执行异步连接
            $task_connection->connect();
        };

        Worker::runAll();
    }
}
